<!DOCTYPE html
  PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<!-- saved from url=(0014)about:internet -->
<html xmlns:MSHelp="http://www.microsoft.com/MSHelp/" lang="en-us" xml:lang="en-us"><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8">

<meta name="DC.Type" content="topic">
<meta name="DC.Title" content="Working on the Assembly Line: pipeline">
<meta name="DC.subject" content="Working on the Assembly Line: pipeline">
<meta name="keywords" content="Working on the Assembly Line: pipeline">
<meta name="DC.Relation" scheme="URI" content="../tbb_userguide/Parallelizing_Complex_Loops.htm">
<meta name="DC.Relation" scheme="URI" content="../tbb_userguide/Using_Circular_Buffers.htm">
<meta name="DC.Relation" scheme="URI" content="../tbb_userguide/Throughput_of_pipeline.htm">
<meta name="DC.Relation" scheme="URI" content="../tbb_userguide/Non-Linear_Pipelines.htm">
<meta name="DC.Relation" scheme="URI" content="../tbb_userguide/Summary_of_Loops_and_Pipelines.htm">
<meta name="DC.Format" content="XHTML">
<meta name="DC.Identifier" content="tutorial_Working_on_the_Assembly_Line_pipeline">
<link rel="stylesheet" type="text/css" href="../intel_css_styles.css">
<title>Working on the Assembly Line: pipeline</title>
<xml>
<MSHelp:Attr Name="DocSet" Value="Intel"></MSHelp:Attr>
<MSHelp:Attr Name="Locale" Value="kbEnglish"></MSHelp:Attr>
<MSHelp:Attr Name="TopicType" Value="kbReference"></MSHelp:Attr>
</xml>
</head>
<body id="tutorial_Working_on_the_Assembly_Line_pipeline">
 <!-- ==============(Start:NavScript)================= -->
 <script src="..\NavScript.js" language="JavaScript1.2" type="text/javascript"></script>
 <script language="JavaScript1.2" type="text/javascript">WriteNavLink(1);</script>
 <!-- ==============(End:NavScript)================= -->
<a name="tutorial_Working_on_the_Assembly_Line_pipeline"><!-- --></a>

 
  <h1 class="topictitle1">Working on the Assembly Line: pipeline</h1>
 
   
  <div> 
	 <p><em>Pipelining</em> is a common parallel pattern that mimics a traditional
		manufacturing assembly line. Data flows through a series of pipeline filters
		and each filter processes the data in some way. Given an incoming stream of
		data, some of these filters can operate in parallel, and others cannot. For
		example, in video processing, some operations on frames do not depend on other
		frames, and so can be done on multiple frames at the same time. On the other
		hand, some operations on frames require processing prior frames first. 
	 </p>
 
	 <p>The Intel&reg; Threading Building Blocks (Intel&reg; TBB) classes 
		<samp class="codeph">pipeline</samp> and 
		<samp class="codeph">filter</samp> implement the pipeline pattern. A simple text
		processing example will be used to demonstrate the usage of 
		<samp class="codeph">pipeline</samp> and 
		<samp class="codeph">filter</samp> to perform parallel formatting. The example reads
		a text file, squares each decimal numeral in the text, and writes the modified
		text to a new file. Below is a picture of the pipeline. 
	 </p>
 
	 <div class="Note"><h3 class="NoteTipHead">
					Caution</h3> Because the body object provided to the filters of
		the 
		<samp class="codeph">parallel_pipline</samp> might be copied, its 
		<samp class="codeph">operator()</samp> should not modify the body. Otherwise the
		modification might or might not become visible to the thread that invoked 
		<samp class="codeph">parallel_pipeline</samp>, depending upon whether 
		<samp class="codeph">operator()</samp> is acting on the original or a copy. As a
		reminder of this nuance, 
		<samp class="codeph">parallel_pipeline</samp> requires that the body object's 
		<samp class="codeph">operator()</samp> be declared 
		<samp class="codeph">const</samp>. 
	 </div> 
	 
<div class="tablenoborder"><table cellpadding="4" summary="" frame="void" border="1" rules="none" cellspacing="2"> 
		  <tbody> 
			 <tr> 
				<td class="noborder" valign="top" width="NaN%"> 
				  <p>Read chunk<br> from input file 
				  </p>
 
				</td>
 
				<td class="noborder" valign="top" width="NaN%"><br><img src="Images/image010.jpg" width="31" height="26"><br> 
				</td>
 
				<td class="noborder" valign="top" width="NaN%"> 
				  <p>Square numerals 
					 <br> in chunk 
				  </p>
 
				</td>
 
				<td class="noborder" valign="top" width="NaN%"><br><img src="Images/image010.jpg" width="31" height="26"><br> 
				</td>
 
				<td class="noborder" valign="top" width="NaN%"> 
				  <p>Write chunk 
					 <br> to output file 
				  </p>
 
				</td>
 
			 </tr>
 
		  </tbody>
 
		</table>
</div>
 
	 <p>Assume that the raw file I/O is sequential. The squaring filter can be
		done in parallel. That is, if you can serially read 
		<var>n</var> chunks very quickly, you can transform each of the 
		<var>n</var> chunks in parallel, as long as they are written in
		the proper order to the output file. Though the raw I/O is sequential, the
		formatting of input and output can be moved to the middle filter, and thus be
		parallel. 
	 </p>
 
	 <p>To amortize parallel scheduling overheads, the filters operate on chunks
		of text. Each input chunk is approximately 4000 characters. Each chunk is
		represented by an instance of class 
		<samp class="codeph">TextSlice</samp>: 
	 </p>
 
	 <pre>// Holds a slice of text.
/** Instances *must* be allocated/freed using methods herein, because the C++ declaration
   represents only the header of a much larger object in memory. */
class TextSlice {
    // Pointer to one past last character in sequence
    char* logical_end;
    // Pointer to one past last available byte in sequence.
    char* physical_end;
public:
    // Allocate a TextSlice object that can hold up to max_size characters.
    static TextSlice* allocate( size_t max_size ) {
        // +1 leaves room for a terminating null character.
        TextSlice* t = (TextSlice*)tbb::tbb_allocator&lt;char&gt;().allocate( sizeof(TextSlice)+max_size+1 );
        t-&gt;logical_end = t-&gt;begin();
        t-&gt;physical_end = t-&gt;begin()+max_size;
        return t;
    }
    // Free this TextSlice object
    void free() {
        tbb::tbb_allocator&lt;char&gt;().deallocate((char*)this, sizeof(TextSlice)+(physical_end-begin())+1);
    }
    // Pointer to beginning of sequence
    char* begin() {return (char*)(this+1);}
    // Pointer to one past last character in sequence
    char* end() {return logical_end;}
    // Length of sequence
    size_t size() const {return logical_end-(char*)(this+1);}
    // Maximum number of characters that can be appended to sequence
    size_t avail() const {return physical_end-logical_end;}
    // Append sequence [first,last) to this sequence.
    void append( char* first, char* last ) {
        memcpy( logical_end, first, last-first );
        logical_end += last-first;
    }
    // Set end() to given value.
    void set_end( char* p ) {logical_end=p;}
};</pre> 
	 <p>Below is the top-level code for building and running the pipeline. 
		<samp class="codeph">TextSlice</samp> objects are passed between filters using
		pointers to avoid the overhead of copying a 
		<samp class="codeph">TextSlice</samp>. 
	 </p>
 
	 <pre>void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::parallel_pipeline(
        ntoken,
        tbb::make_filter&lt;void,TextSlice*&gt;(
            tbb::filter::serial_in_order, MyInputFunc(input_file) )
    &amp;
        tbb::make_filter&lt;TextSlice*,TextSlice*&gt;(
            tbb::filter::parallel, MyTransformFunc() )
    &amp;
        tbb::make_filter&lt;TextSlice*,void&gt;(
            tbb::filter::serial_in_order, MyOutputFunc(output_file) ) );
} </pre> 
	 <p>The parameter 
		<samp class="codeph"><em>ntoken</em></samp> to method 
		<samp class="codeph">parallel_pipeline</samp> controls the level of parallelism.
		Conceptually, tokens flow through the pipeline. In a serial in-order filter,
		each token must be processed serially in order. In a parallel filter, multiple
		tokens can by processed in parallel by the filter. If the number of tokens were
		unlimited, there might be a problem where the unordered filter in the middle
		keeps gaining tokens because the output filter cannot keep up. This situation
		typically leads to undesirable resource consumption by the middle filter. The
		parameter to method 
		<samp class="codeph">parallel_pipeline</samp> specifies the maximum number of tokens
		that can be in flight. Once this limit is reached, the pipeline never creates a
		new token at the input filter until another token is destroyed at the output
		filter. 
	 </p>
 
	 <p>The second parameter specifies the sequence of filters. Each filter is
		constructed by function 
		<samp class="codeph">make_filter&lt;</samp><em>inputType</em><samp class="codeph"><em>,</em></samp><em>outputType</em><samp class="codeph">&gt;(</samp><em>mode</em><samp class="codeph">,</samp><em>functor</em>).
		
	 </p>
 
	 <ul type="disc"> 
		<li> 
		  <p>The 
			 <em>inputType</em> specifies the type of values input by a filter. For
			 the input filter, the type is 
			 <samp class="codeph">void</samp>. 
		  </p>
 
		</li>
 
		<li> 
		  <p>The 
			 <em>outputType</em> specifies the type of values output by a filter.
			 For the output filter, the type is 
			 <samp class="codeph">void</samp>. 
		  </p>
 
		</li>
 
		<li> 
		  <p>The 
			 <em>mode</em> specifies whether the filter processes items in parallel,
			 serial in-order, or serial out-of-order. 
		  </p>
 
		</li>
 
		<li> 
		  <p>The 
			 <em>functor</em> specifies how to produce an output value from an input
			 value. 
		  </p>
 
		</li>
 
	 </ul>
 
	 <p>The filters are concatenated with 
		<samp class="codeph">operator&amp;</samp>. When concatenating two filters, the 
		<em>outputType</em> of the first filter must match the 
		<em>inputType</em> of the second filter. 
	 </p>
 
	 <p>The filters can be constructed and concatenated ahead of time. An
		equivalent version of the previous example that does this follows: 
	 </p>
 
	 <pre>void RunPipeline( int ntoken, FILE* input_file, FILE* output_file ) {
    tbb::filter_t&lt;void,TextSlice*&gt; f1( tbb::filter::serial_in_order, 
                                       MyInputFunc(input_file) );
    tbb::filter_t&lt;TextSlice*,TextSlice*&gt; f2(tbb::filter::parallel, 
                                            MyTransformFunc() );
    tbb::filter_t&lt;TextSlice*,void&gt; f3(tbb::filter::serial_in_order, 
                                      MyOutputFunc(output_file) );
    tbb::filter_t&lt;void,void&gt; f = f1 &amp; f2 &amp; f3;
    tbb::parallel_pipeline(ntoken,f);
}</pre> 
	 <p>The input filter must be 
		<samp class="codeph">serial_in_order</samp> in this example because the filter reads
		chunks from a sequential file and the output filter must write the chunks in
		the same order. All 
		<samp class="codeph">serial_in_order</samp> filters process items in the same order.
		Thus if an item arrives at 
		<samp class="codeph">MyOutputFunc</samp> out of the order established by 
		<samp class="codeph">MyInputFunc</samp>, the pipeline automatically delays invoking 
		<samp class="codeph">MyOutputFunc::operator()</samp> on the item until its
		predecessors are processed. There is another kind of serial filter, 
		<samp class="codeph">serial_out_of_order</samp>, that does not preserve order. 
	 </p>
 
	 <p>The middle filter operates on purely local data. Thus any number of
		invocations of its functor can run concurrently. Hence it is specified as a
		parallel filter. 
	 </p>
 
	 <p>The functors for each filter are explained in detail now. The output
		functor is the simplest. All it has to do is write a 
		<samp class="codeph">TextSlice</samp> to a file and free the 
		<samp class="codeph">TextSlice</samp>. 
	 </p>
 
	 <pre>// Functor that writes a TextSlice to a file.
class MyOutputFunc {
    FILE* my_output_file;
public:
    MyOutputFunc( FILE* output_file );
    void operator()( TextSlice* item ) const;
};
&nbsp;
MyOutputFunc::MyOutputFunc( FILE* output_file ) :
    my_output_file(output_file)
{
}
&nbsp;
void MyOutputFunc::operator()( TextSlice* out ) const {
    size_t n = fwrite( out-&gt;begin(), 1, out-&gt;size(), my_output_file );
    if( n!=out-&gt;size() ) {
        fprintf(stderr,"Can't write into file '%s'\n", OutputFileName);
        exit(1);
    }
    out-&gt;free();
} </pre> 
	 <p>Method 
		<samp class="codeph">operator()</samp> processes a 
		<samp class="codeph">TextSlice</samp>. The parameter 
		<samp class="codeph">out</samp> points to the 
		<samp class="codeph">TextSlice</samp> to be processed. Since it is used for the last
		filter of the pipeline, it returns 
		<samp class="codeph">void</samp>. 
	 </p>
 
	 <p>The functor for the middle filter is similar, but a bit more complex. It
		returns a pointer to the 
		<samp class="codeph">TextSlice</samp> that it produces. 
	 </p>
 
	 <pre>// Functor that changes each decimal number to its square.
class MyTransformFunc {
public:
    TextSlice* operator()( TextSlice* input ) const;
};

TextSlice* MyTransformFunc::operator()( TextSlice* input ) const {
    // Add terminating null so that strtol works right even if number is at end of the input.
    *input-&gt;end() = '\0';
    char* p = input-&gt;begin();
    TextSlice* out = TextSlice::allocate( 2*MAX_CHAR_PER_INPUT_SLICE );
    char* q = out-&gt;begin();
    for(;;) {
        while( p&lt;input-&gt;end() &amp;&amp; !isdigit(*p) )
            *q++ = *p++;
        if( p==input-&gt;end() )
            break;
        long x = strtol( p, &amp;p, 10 );
        // Note: no overflow checking is needed here, as we have twice the
        // input string length, but the square of a non-negative integer n
        // cannot have more than twice as many digits as n.
        long y = x*x;
        sprintf(q,"%ld",y);
        q = strchr(q,0);
    }
    out-&gt;set_end(q);
    input-&gt;free();
    return out;
} </pre> 
	 <p>The input functor is the most complicated, because it has to ensure that
		no numeral crosses a boundary. When it finds what could be a numeral crossing
		into the next slice, it copies the partial numeral to the next slice.
		Furthermore, it has to indicate when the end of input is reached. It does this
		by invoking method 
		<samp class="codeph">stop()</samp> on a special argument of type 
		<samp class="codeph">flow_control</samp>. This idiom is required for any functor
		used for the first filter of a pipline. It is shown in 
		<samp class="codeph"><span style="color:blue"><strong>bold font</strong></span></samp> in
		the following code for the functor: 
	 </p>
 
	 <pre>TextSlice* next_slice = NULL;

class MyInputFunc {
public:
    MyInputFunc( FILE* input_file_ );
    MyInputFunc( const MyInputFunc&amp; f ) : input_file(f.input_file) { }
    ~MyInputFunc();
    TextSlice* operator()( tbb::flow_control&amp; fc ) const;
private:
    FILE* input_file;
};
 
MyInputFunc::MyInputFunc( FILE* input_file_ ) :
    input_file(input_file_) { }
 
MyInputFunc::~MyInputFunc() {
}
 
TextSlice* MyInputFunc::operator()( <span style="color:blue"><strong>tbb::flow_control&amp; fc</strong></span> ) const {
    // Read characters into space that is available in the next slice.
    if( !next_slice )
        next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );
    size_t m = next_slice-&gt;avail();
    size_t n = fread( next_slice-&gt;end(), 1, m, input_file );
    if( !n &amp;&amp; next_slice-&gt;size()==0 ) {
        // No more characters to process
        <span style="color:blue"><strong>fc.stop()</strong></span>;
        return NULL;
    } else {
        // Have more characters to process.
        TextSlice* t = next_slice;
        next_slice = TextSlice::allocate( MAX_CHAR_PER_INPUT_SLICE );
        char* p = t-&gt;end()+n;
        if( n==m ) {
            // Might have read partial number.  
            // If so, transfer characters of partial number to next slice.
            while( p&gt;t-&gt;begin() &amp;&amp; isdigit(p[-1]) )
                --p;
            assert(p&gt;t-&gt;begin(),"Number too large to fit in buffer.\n");
            next_slice-&gt;append( p, t-&gt;end()+n );
        }
        t-&gt;set_end(p);
        return t;
    }
}</pre> 
	 <p>The copy constructor must be defined because the functor is copied when
		the filter_t is built from the functor, and again when the pipeline runs. 
	 </p>
 
	 <p>The 
		<samp class="codeph">parallel_pipeline</samp> syntax is new in Intel&reg; TBB 3.0. The
		directory 
		<samp class="codeph">examples/pipeline/square</samp> contains the complete code for
		the squaring example in an older lower-level syntax where the filters are
		defined via inheritance. The Reference manual describes both syntaxes. 
	 </p>
 
  </div>
 

<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong>&nbsp;<a href="../tbb_userguide/Parallelizing_Complex_Loops.htm">Parallelizing Complex Loops</a></div>
</div>
<div>
<ul class="ullinks">
<li class="ulchildlink"><a href="../tbb_userguide/Using_Circular_Buffers.htm">Using Circular Buffers</a><br>
</li>
<li class="ulchildlink"><a href="../tbb_userguide/Throughput_of_pipeline.htm">Throughput of pipeline</a><br>
</li>
<li class="ulchildlink"><a href="../tbb_userguide/Non-Linear_Pipelines.htm">Non-Linear Pipelines</a><br>
</li>
<li class="ulchildlink"><a href="../tbb_userguide/Summary_of_Loops_and_Pipelines.htm">Summary of Loops and Pipelines</a><br>
</li>
</ul>
</div>

</body>
</html>
